Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CallbackGroup centric executor #2405

Draft
wants to merge 5 commits into
base: rolling
Choose a base branch
from

Conversation

jmachowinski
Copy link
Contributor

@jmachowinski jmachowinski commented Jan 16, 2024

This is a first version for a new executor, that saves ready executables per callback group.
In doing so, this executor should have the same behavior as the SingleThreaded executor in the

Current behavior of the SingleThreadedExecutor

Currently the SingleThreadedExecutor will collect all ready events by calling wait_for_work

Executor::wait_for_work(std::chrono::nanoseconds timeout)

This will clear the global internal event queue (within AllocatorMemoryStrategy) and repopulate it with ready events.

Afterwards, the SingleThreadedExecutor will process all elements from the global internal event queue
by calling

Executor::get_next_ready_executable(AnyExecutable & any_executable)

followed by

Executor::execute_any_executable(AnyExecutable & any_exec)

in a loop.

Note, get_next_ready_executable returns the ready events in a prioritized way e.g Timers etc.

If get_next_ready_executable returns false, the global event queue will be empty and wait_for_work will be called again.

In case that multiple events are ready at the time wait_for_work was called, this will lead to prioritized first in first out pattern, as events, that become ready while the processing happens are not added as wait_for_work will not be called, until all events have been processed.

Current behavior of the MultiThreadedExecutor

The MultiThreadedExecutor utilizes the same code as the SingleThreadedExecutor and also has a global internal event queue. It will also call wait_for_work

Executor::wait_for_work(std::chrono::nanoseconds timeout)

to clear and populate the internal queue.

Afterwards, there will be multiple threads, calling

Executor::get_next_ready_executable(AnyExecutable & any_executable)
on the queue and process the taken events by calling
Executor::execute_any_executable(AnyExecutable & any_exec)

If get_next_ready_executable returns false, wait_for_work will be called again. This will also block all other threads from calling get_next_ready_executable until wait_for_work returns. At this point, there is a subtle difference, that the global event queue is not guaranteed to empty.

The difference comes from the facts that get_next_ready_executable will not return any event, that belongs to a mutal exclusive callback group, that is marked as being currently executed (by another thread).

Therefore we often see a pattern like this:
Wait for work is called, and multiple events of one callback group are ready and enqueued. One thread will take one event from the queue and start processing it. Meanwhile a second thread will come by and call get_next_ready_executable. As all events that are ready, belong to a callback group that is executing, get_next_ready_executable will return false, and wait_for_work will be called. This will clear the global event queue.

To avoid a life loop, wait_for_work will mask the callback group in execution from the wait set and block the thread.

After thread one has finished executing the event, it will mark the callback group as ready, and call a guard condition, that will wake up the thread blocked by wait_for_work.

Thread 2 will be woken up, by the guard condition, and collect all ready events. Mostly there will be no other event ready at this point, and get_next_ready_executable will return false, as wait_for_work cleared the event queue.
Afterwards thread 2 will call wait_for_work again, this time with the callback group added to the waitset, as it is now ready. As events are still ready in the callback group, that was masked before, rcl_wait will return instantly. Note, if meanwhile new events became ready, they will also be added to the event queue. Thread 2 will take the highest priority event afterwards and execute it.

The result will be a different pattern of execution of the events, leaning hard towards Priority first, in case of events that take a long time.

A really undesired behavior, that we observed a lot happens if you have a timer, that takes longer than its period.
In this case, on the timer will be executed in an endless loop, while no subscriptions, services etc will be processed any more by the executor.

A second undesired behavior comes up in combination with guard conditions. Guard conditions will wake up the wait_for_work function only once after each trigger. This leads to problems with Waitables, that internally use guard conditions, as they need to be retriggerd any time the Waitable is added to a waitset and there is still data available for processing. Bugs related this have been observe with the interprocess communication and a fix is currently worked on (#2420)

A third point would be, that wait_for_work is expensive to call and that the current logic calls it multiple times without a real need, thus increasing latency and CPU load.

Proposed solution

Instead of using one global event queue, have one event queue per callback group.

This will lead to the identical execution pattern, per callback group as the SingleThreadedExecutor, thus avoiding the starvation issues we are seeing with the multithreaded executor.

As Waitables using guard conditions internally will also be guaranteed to be executed after a guard condition trigger, this would also solve the issue of the lost events and lower the pitfalls of implementing Waitables.

Last, the CPU load and latency can be reduced, as we can safe a lot of unneeded calls to wait_for_work.

Proposed Implementation

In order, to be able to queue ready events by its callback group, we need to have a reverse mapping of rcl events in the waitset to the rclcpp Executable. This implementation archives this, by to extensions to rcl:

  • Per rcl event (rcl_client_s, rcl_subscription_s etc.) a user_data void ptr is added
  • rcl_wait_set_s is extended of counters of added events. E.g. nr_of_added_subscriptions etc.

While adding the rcl events to the waitset, the user_data is set to a pointer to a std::variant.
The std::variant, contains the originating queue of the event, and the type of the event.
This is needed, as the source of every event is either [Subscription, Timer, Client, Service], or rclcpp:Waitable.

For timers, subscriptions, clients and services adding the mapping is straight forward.

As the executor has no way of knowing a prior what a Waitable will add, it safes the count of the added elements to the waitset before calling add_to_wait_set and computes the added elements from the count difference after the call. Afterwards every user_data pointer of the newly added elements will be set to a variant pointing to this Waitable.

After the call to rcl_wait, all ready events will be collected, and added to their origin event queue. Note, Waitables will only be added once, even if multiple rcl events trigger for it.

Wait_for_work

Housekeeping

  • Sync added remove nodes
  • Regeneration of precomputed structures if needed

Adds all callback groups with no unprocessed events to the waitset.

calls rcl_wait

Collects all ready events

Call notify one for each newly ready callback group, to wake up worker threads

get_next_ready_executable

This function is now threadsafe
Returns events in priority order from ready callback groups.
E.g. timers first if any ready in any ready callback group.

spin loop

If a thread has no work, it will call
get_next_ready_executable

  • If work available execute
    -- execute
    -- If executed event was last in callback group
    --- wake up wait_for_work
  • -> continue

Will try to acquire the wait_for_work_mutex

  • If success wait on work, continue

block on a std::conditional_variable

Performance results

The performance of the executor in respect to the existing ones depends on the number of added elements.
In general it

  • is is slightly slower than the StaticSingeThreadedExecutor for spinning.
  • Is way slower on change of the callback group
  • Is around way faster then the SingleThreadedExecutor / MultiThreadedExecutor

On low numbers (10 Nodes, each 1 Sub and 1 Pub)

PerformanceTestExecutor/single_thread_executor_spin_some                                         42699 ns        42782 ns        16292
PerformanceTestExecutor/static_single_thread_executor_spin_some                                  22656 ns        22723 ns        30703
PerformanceTestExecutor/multi_thread_executor_spin_some                                          42745 ns        42820 ns        16310
PerformanceTestExecutor/cbg_executor_spin_some                                                   26589 ns        26685 ns        26161
PerformanceTestExecutor/single_thread_executor_wait_for_work                                     17207 ns        17137 ns        40609
PerformanceTestExecutor/multi_thread_executor_wait_for_work                                      17343 ns        17261 ns        40778
PerformanceTestExecutor/cbg_executor_wait_for_work                                                4247 ns         4183 ns       168615
PerformanceTestExecutor/single_thread_executor_wait_for_work_rebuild                             24565 ns        24161 ns        28606
PerformanceTestExecutor/multi_thread_executor_wait_for_work_rebuild                              24737 ns        24335 ns        28899
PerformanceTestExecutor/cbg_executor_wait_for_work_rebuild                                      282876 ns       279871 ns         2502
CascadedPerformanceTestExecutor/single_thread_executor_spin                                     173399 ns         3459 ns       100000
CascadedPerformanceTestExecutor/static_single_thread_executor_spin                               88461 ns         3729 ns       100000
CascadedPerformanceTestExecutor/multi_thread_executor_spin                                      186569 ns         2937 ns       100000
CascadedPerformanceTestExecutor/cbg_executor_spin                                                93951 ns         2263 ns       100000
PerformanceTestExecutorMultipleCallbackGroups/single_thread_executor_spin_some                   47279 ns        47357 ns        15353
PerformanceTestExecutorMultipleCallbackGroups/static_single_thread_executor_spin_some            22489 ns        22550 ns        30039
PerformanceTestExecutorMultipleCallbackGroups/multi_thread_executor_spin_some                    46441 ns        46527 ns        15106
PerformanceTestExecutorMultipleCallbackGroups/cbg_executor_spin_some                             28717 ns        28808 ns        24270

On high numbers (10 Nodes, each 10 Subs and 10 Pubs)

PerformanceTestExecutor/single_thread_executor_spin_some                                        476171 ns       476278 ns         1460
PerformanceTestExecutor/static_single_thread_executor_spin_some                                 155358 ns       155446 ns         4528
PerformanceTestExecutor/multi_thread_executor_spin_some                                         479539 ns       479620 ns         1454
PerformanceTestExecutor/cbg_executor_spin_some                                                  163037 ns       163165 ns         4320
PerformanceTestExecutor/single_thread_executor_wait_for_work                                     59881 ns        59814 ns        11639
PerformanceTestExecutor/multi_thread_executor_wait_for_work                                      60201 ns        60141 ns        11647
PerformanceTestExecutor/cbg_executor_wait_for_work                                               13684 ns        13562 ns        51640
PerformanceTestExecutor/single_thread_executor_wait_for_work_rebuild                             70280 ns        69854 ns        10043
PerformanceTestExecutor/multi_thread_executor_wait_for_work_rebuild                              70488 ns        70066 ns        10035
PerformanceTestExecutor/cbg_executor_wait_for_work_rebuild                                      296157 ns       295951 ns         2321
CascadedPerformanceTestExecutor/single_thread_executor_spin                                     629502 ns         3192 ns        10000
CascadedPerformanceTestExecutor/static_single_thread_executor_spin                              369457 ns         3498 ns        10000
CascadedPerformanceTestExecutor/multi_thread_executor_spin                                      652901 ns         3143 ns        10000
CascadedPerformanceTestExecutor/cbg_executor_spin                                               361602 ns         2369 ns        10000
PerformanceTestExecutorMultipleCallbackGroups/single_thread_executor_spin_some                  833350 ns       833456 ns          806
PerformanceTestExecutorMultipleCallbackGroups/static_single_thread_executor_spin_some           152967 ns       153077 ns         4547
PerformanceTestExecutorMultipleCallbackGroups/multi_thread_executor_spin_some                   862221 ns       862330 ns          797
PerformanceTestExecutorMultipleCallbackGroups/cbg_executor_spin_some                            188593 ns       188711 ns         3686

On absurd high numbers (10 Nodes, each 30 Subs and 30 Pubs)

PerformanceTestExecutor/single_thread_executor_spin_some                                       2793936 ns      2794024 ns          248
PerformanceTestExecutor/static_single_thread_executor_spin_some                                 581873 ns       581948 ns         1174
PerformanceTestExecutor/multi_thread_executor_spin_some                                        2811405 ns      2811414 ns          248
PerformanceTestExecutor/cbg_executor_spin_some                                                  614209 ns       614256 ns         1106
PerformanceTestExecutor/single_thread_executor_wait_for_work                                    166914 ns       166823 ns         4194
PerformanceTestExecutor/multi_thread_executor_wait_for_work                                     166769 ns       166697 ns         4185
PerformanceTestExecutor/cbg_executor_wait_for_work                                               36011 ns        35882 ns        19464
PerformanceTestExecutor/single_thread_executor_wait_for_work_rebuild                            227946 ns       227469 ns         3001
PerformanceTestExecutor/multi_thread_executor_wait_for_work_rebuild                             233806 ns       233300 ns         3035
PerformanceTestExecutor/cbg_executor_wait_for_work_rebuild                                      409781 ns       409542 ns         1699
CascadedPerformanceTestExecutor/single_thread_executor_spin                                    1767707 ns         3736 ns        10000
CascadedPerformanceTestExecutor/static_single_thread_executor_spin                             1133730 ns         3582 ns        10000
CascadedPerformanceTestExecutor/multi_thread_executor_spin                                     1784937 ns         3222 ns        10000
CascadedPerformanceTestExecutor/cbg_executor_spin                                              1072787 ns         3550 ns        10000
PerformanceTestExecutorMultipleCallbackGroups/single_thread_executor_spin_some                 6043592 ns      6043685 ns          112
PerformanceTestExecutorMultipleCallbackGroups/static_single_thread_executor_spin_some           580017 ns       580088 ns         1175
PerformanceTestExecutorMultipleCallbackGroups/multi_thread_executor_spin_some                  6097011 ns      6096917 ns          112
PerformanceTestExecutorMultipleCallbackGroups/cbg_executor_spin_some                            749972 ns       750076 ns          905

The iRobot Benchmark results :
topology/white_mountain.json

StaticSingeThreadedExecutor
System total:
received_msgs  mean_us   late_msgs late_perc too_late_msgs  too_late_perc  lost_msgs lost_perc 
6127           12        0         0         0              0              0         
SingleThreadedExecutor
System total:
received_msgs  mean_us   late_msgs late_perc too_late_msgs  too_late_perc  lost_msgs lost_perc 
6127           14        0         0         0              0              0         0    
MultiThreadedExecutor
System total:
received_msgs  mean_us   late_msgs late_perc too_late_msgs  too_late_perc  lost_msgs lost_perc 
6127           24        0         0         0              0              0         0  
CbgSingleThreadExecutor
System total:
received_msgs  mean_us   late_msgs late_perc too_late_msgs  too_late_perc  lost_msgs lost_perc 
7219           13        0         0         0              0              0         0  
CbgMultiThreadExecutor
received_msgs  mean_us   late_msgs late_perc too_late_msgs  too_late_perc  lost_msgs lost_perc 
7216           15        0         0         0              0              0         0

Note, there is something fishy going on, as the CbgExecutor receives more messages than the normal one, I am still locking into this.

@mjcarroll @wjwwood @clalancette @fujitatomoya Can you have a short look at this ?

@jmachowinski
Copy link
Contributor Author

jmachowinski commented Jan 29, 2024

I worked further on this, and the executor is now usable and works stable in our system.

I did a few performance measurements, and came our ahead of the SingleThreaded and the Multithreaded executors.
For comparison I used rclcpp:benchmark_executors and the irobot benchmark.

Removed benchmark results, as they were taken with the CPU frequency scheduler not in performance mode. Please referee to the ones above.

Janosch Machowinski and others added 4 commits January 31, 2024 20:19
…cuters

This allows us to use executers that are not derived from rclcpp::Executor.

Signed-off-by: Janosch Machowinski <[email protected]>
Signed-off-by: Janosch Machowinski <[email protected]>

# Conflicts:
#	rclcpp/include/rclcpp/executor.hpp
#	rclcpp/src/rclcpp/executor.cpp
#	rclcpp/test/rclcpp/executors/test_executors.cpp
@jmachowinski jmachowinski changed the title DRAFT CallbackGroup centric executor CallbackGroup centric executor Feb 3, 2024
@alsora
Copy link
Collaborator

alsora commented Feb 5, 2024

Note, there is something fishy going on, as the CbgExecutor receives more messages than the normal one

The likely explanation is that publishers/timers are running faster (thanks to the improved performance of the new executor) so you are publishing more messages.

I would be interested in seeing a performance comparison with the EventsExecutor, which is considerably faster than the other executors you tested with.
IMO if this proposed executor does not have better performance than the EventsExecutor, we should not add it to rclcpp.
We are already looking to unify the existing executors because there's no reason to have so many, so a new executor should be added only if it's superior in terms of performance.

Moreover, with a new executor proposal, we should have a detailed performance analysis and design document accompanying it.
The new executor should initially either be implemented in a separate repository or in the experimental namespace/directory.

NOTE: I didn't look at the code yet.
I'll try to do a review this week.

@jmachowinski
Copy link
Contributor Author

The likely explanation is that publishers/timers are running faster (thanks to the improved performance of the new executor) so you are publishing more messages.

That would be nice, but I think something is broken with the shutdown.

I would be interested in seeing a performance comparison with the EventsExecutor, which is considerably faster than the other executors you tested with.

Define 'faster' :-). The latency of the EventsExecutor is way better.

IMO if this proposed executor does not have better performance than the EventsExecutor, we should not add it to rclcpp.

The main point of this executor is that it has similar behavior in single and multithreaded mode. Note, this is not the case for the current executors. That it is fast, and shows a reduction of CPU load is a nice addon.

Moreover, with a new executor proposal, we should have a detailed performance analysis and design document accompanying it. The new executor should initially either be implemented in a separate repository or in the experimental namespace/directory.

I changes the design 4 times during the refinement of the approach... This started out as a request for feedback for an idea, and meanwhile evolved into a full blown reasonable stable executor. I would first like to discuss it, and if I can get the changes needed for it into rclcpp and rcl, before putting any work into design documents...

NOTE: I didn't look at the code yet. I'll try to do a review this week.

That would be great, thanks

@alsora
Copy link
Collaborator

alsora commented Feb 5, 2024

The main point of this executor is that it has similar behavior in single and multithreaded mode. Note, this is not the case for the current executors. That it is fast, and shows a reduction of CPU load is a nice addon.

This makes sense.
However I think that the current landscape of executors is already quite confusing for a user.

Usually the path forward for a new executor is the following

  • create a dedicated repository for this new executor (outside of rclcpp)
  • create design documents
  • gather community interest
  • eventually push to rclcpp if there's enough interest

This PR can stay open (maybe as a draft) to get initial feedbacks from the maintainers, but I would prefer to start with a design document to fully understand the theory of operation before diving into the code.

If the purpose of this executor is to "fix" a bug in the current single and multithreaded executors, I would rather ask to replace them and/or apply the changes to those executors themselves rather than creating a new one.

On the other hand, if "having a similar behavior in single and multithreaded mode" isn't really a bug-fix, but rather an alternative approach (similar to how the events executor processes events in a different order than the single threaded executor), we could consider adding this new executor after the community has showed enough interest in it (e.g. contributions, discourse discussions, etc)

Define 'faster' :-). The latency of the EventsExecutor is way better.

I used "faster" on purpose as a very generic term. We showed how the events executor had better performance than the standard executor across a large number of tests and metrics (CPU usage, latency, throughput, accuracy of timers, and slightly even RAM usage).
If a similar case could be made for a new executor, I would recommend the process described before with the goal to replace the events-executor (which is still marked as "experimental").

We currently have 4 executors in the repo. I really don't want to go to 5 unless there's a very strong reason.
We had plans to reduce the number of executors (fully deprecate the StaticSingleThreadedExecutor, move the EventsExecutor out of ::experimental and make it the default, eventually remove the SingleThreadedExecutor), so it would be great if new executor proposals fit into this schema of "reducing the complexity for users".

@jmachowinski
Copy link
Contributor Author

jmachowinski commented Feb 5, 2024

This PR can stay open (maybe as a draft) to get initial feedbacks from the maintainers, but I would prefer to start with a design document to fully understand the theory of operation before diving into the code.

I'll try to make some time to come up with a document.

If the purpose of this executor is to "fix" a bug in the current single and multithreaded executors, I would rather ask to replace them and/or apply the changes to those executors themselves rather than creating a new one.

Replacing Static, Single and Multi threaded Executor with this one would be feasible, but also a bit drastic. My idea was to add it as a possible replacement for transition and testing.
From my point of view this is a needed bugfix, as the MultiThreadedExecutor caused a lot of headaches for us in the past year. But it is also a new approach from the internal working of things.

I had a short look at the EventsExecutor and would say, if we merge it with this approach, we will get something really nice. The EventsExecutor has the solution to how avoid the waitsets, which take ages to build. On the other side, the cbg executor has the logic, to schedule ready events in callback groups with priorized order by multiple threads.

The results should be fast, but behave similar to the current executor.

@jmachowinski jmachowinski marked this pull request as draft February 6, 2024 10:27
@jmachowinski
Copy link
Contributor Author

@alsora I overworked the description, to contain a proposal and a detailed description of the issue that is should fix.

I would be fine, with having this executor outside of rclcpp, but as mentioned above, I will need to adopt some APIs and structures slightly in rcl and rclcpp for this.

As for my idea on how to combine this and the EventsExecutor approach, its basically 'take out the wait for work stuff from the spin function and fill the event queues from callbacks from RWM'. The housekeeping might be challenging though.

@mjcarroll
Copy link
Member

Many ideas here, thanks for doing the leg-work and sorry for taking so long to begin thinking about this.

One note, and something that @wjwwood keeps reminding me about our current implementation

This will clear the global internal event queue (within AllocatorMemoryStrategy) and repopulate it with ready events.

This isn't so much a queue as a set. Basically, everything has a boolean state of "am I ready or not" rather than "how many things do I have ready". In the case that you had a subscription that received 1 message during the wait and one that received 100 messages, both will be processed equally (one callback per subscription).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants